package com.shujia;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class Demo4Filter {

    private Connection conn;
    private Admin admin;

    public void printRS(ResultScanner scanner) {
        for (Result rs : scanner) {
            String rowkey = Bytes.toString(rs.getRow());
            System.out.println("当前数据的rowkey为：" + rowkey);
            for (Cell cell : rs.listCells()) {
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                byte[] valueBytes = CellUtil.cloneValue(cell);
                // 获取rowkey
//                String row = Bytes.toString(CellUtil.cloneRow(cell));

                String value;
                if ("age".equals(qualifier)) {
                    value = String.valueOf(Bytes.toInt(valueBytes));
                } else {
                    value = Bytes.toString(valueBytes);
                }
                System.out.println("当前获取的cell的列簇名为：" + family + ",列名为：" + qualifier + ",cell的值为：" + value);
            }
        }
    }

    @Before
    public void init() {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");

        try {
            conn = ConnectionFactory.createConnection(conf);
            admin = conn.getAdmin();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 通过RowFilter与 BinaryComparator 过滤比 rowKey 1500100010 小的所有值出来
    @Test
    public void BinaryComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 如果需要比较 两个值 = < > >= <= ...... 推荐使用BinaryComparator
        // 将需要比较的值 传入BinaryComparator的构造方法 创建BinaryComparator对象
        BinaryComparator binaryComparator = new BinaryComparator("1500100010".getBytes());

        // 指定操作符、比较器，构建作用在rowkey上的过滤器
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS, binaryComparator);


        Scan scan = new Scan();

        // 通过setFilter方法设置过滤器
        scan.setFilter(rowFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

    // 通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的所有列簇下的所有数据
    @Test
    public void BinaryPrefixComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 二进制前缀比较器
        BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("info".getBytes());
        // FamilyFilter 作用于列簇的过滤器
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);

        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(familyFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

    // 过滤出姓名以“张”开头的学生姓名
    // 通过ValueFilter与 BinaryPrefixComparator 过滤出列中以 张 开头的所有数据
    @Test
    public void BinaryPrefixComparatorValueFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 二进制前缀比较器
        BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("张".getBytes());
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
        Scan scan = new Scan();

//        scan.withStartRow("1500100001".getBytes());
//        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(valueFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);
    }

    // 过滤出 列的名字 中 包含 "am" 所有的列 及列的值
    @Test
    public void SubstringComparatorQualifierFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        SubstringComparator substringComparator = new SubstringComparator("am");
        // 作用在列名上的过滤器
        QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(qualifierFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

    // 过滤出文科的学生
    // 只会返回clazz列，其他列的数据不符合条件，不会返回
    @Test
    public void RegexStringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 使用正则表达式比较器
        RegexStringComparator regexStringComparator = new RegexStringComparator("^文科.*");
        // ValueFilter 会返回符合条件的cell，并不会返回整条数据
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, regexStringComparator);

        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(valueFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);
    }

    // 过滤出文科的学生
    // 返回整条数据
    @Test
    public void SingleColumnValueFilterTest() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        RegexStringComparator regexStringComparator = new RegexStringComparator("^文科.*");

        // 会返回符合条件的cell所在的rowkey的整行数据
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                "info".getBytes(),
                "clazz".getBytes(),
                CompareFilter.CompareOp.EQUAL,
                regexStringComparator
        );
        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(singleColumnValueFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

    // 过滤出文科的学生
    // 返回整条数据 如果不想打印clazz列 SingleColumnValueExcludeFilter
    @Test
    public void SingleColumnValueExcludeFilterTest() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        RegexStringComparator regexStringComparator = new RegexStringComparator("^文科.*");

        // 会返回符合条件的cell所在的rowkey的整行数据
        SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(
                "info".getBytes(),
                "clazz".getBytes(),
                CompareFilter.CompareOp.EQUAL,
                regexStringComparator
        );
        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(singleColumnValueExcludeFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

    //  通过PrefixFilter查询以150010008开头的所有前缀的rowkey
    @Test
    public void PrefixFilterTest() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        // rowkey前缀过滤器
        // BinaryPrefixComparator +  RowFilter
        PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
        Scan scan = new Scan();
        scan.setFilter(prefixFilter);
        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

    // 通过PageFilter查询第三页的数据，每页10条
    // 每次获取一页数据 需要从第一行扫描到 当前页 第一条数据，效率很低，不推荐使用
    // 可以通过合理的设计rowkey 来实现分页功能
    @Test
    public void PageFilterTest() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        int PageSize = 10;
        int PageNum = 3;
        Scan scan = new Scan();
        if (PageNum == 1) {
            PageFilter pageFilter = new PageFilter(PageSize);
            scan.withStartRow("".getBytes());
            scan.setFilter(pageFilter);
            ResultScanner scanner = students.getScanner(scan);
            printRS(scanner);
        } else if (PageNum > 1) {
            // 先找到PageNum页的startRow
            int scanDS = (PageNum - 1) * PageSize + 1;
            // 取 scanDS 条数  21
            PageFilter pageFilter = new PageFilter(scanDS);
            scan.setFilter(pageFilter);
            ResultScanner scanner = students.getScanner(scan);
            String rowkey = "";
            // 获取结果中最后一条数据的rowkey
            for (Result rs : scanner) {
                rowkey = Bytes.toString(rs.getRow());
            }
            scan.withStartRow(rowkey.getBytes());
            // 去PageSize条数据 10条
            PageFilter pageFilter2 = new PageFilter(PageSize);
            scan.setFilter(pageFilter2);
            ResultScanner scanner1 = students.getScanner(scan);

            printRS(scanner1);

        }


    }

    @Test
    // 通过合理的设置rowkey来实现分页功能，提高效率
    public void PageFilterTest2() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        int PageSize = 10;
        int PageNum = 3;

        int baseId = 1500100000;
        int start_row = baseId + (PageNum - 1) * PageSize + 1;
        int end_row = start_row + PageSize;
        Scan scan = new Scan();
        scan.withStartRow(String.valueOf(start_row).getBytes());
        scan.withStopRow(String.valueOf(end_row).getBytes());

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);


    }

    //  查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息
    //  RegexStringComparator 过滤出文科班的学生
    //  BinaryPrefixComparator 或 PrefixFilter 以150010008开头
    //  BinaryComparator 年龄小于23
    //  SingleColumnValueFilter
    //  FilterList
    @Test
    public void FilterListFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
                "info".getBytes(),
                "clazz".getBytes(),
                CompareFilter.CompareOp.EQUAL,
                new RegexStringComparator("^文科.*")
        );
        PrefixFilter filter2 = new PrefixFilter("150010008".getBytes());
        SingleColumnValueFilter filter3 = new SingleColumnValueFilter(
                "info".getBytes(),
                "age".getBytes(),
                CompareFilter.CompareOp.LESS,
                new BinaryComparator(Bytes.toBytes(23))
        );
        // 将多个过滤条件组装
        FilterList filterList = new FilterList();
        filterList.addFilter(filter1);
        filterList.addFilter(filter2);
        filterList.addFilter(filter3);
        Scan scan = new Scan();
        scan.setFilter(filterList);

        ResultScanner scanner = students.getScanner(scan);
        printRS(scanner);

    }


    @Test
    public void putAll() throws IOException {
        // 先用tableExists判断表是否存在，不存在则创建，存在则跳过
        if (!admin.tableExists(TableName.valueOf("students"))) {
            HTableDescriptor students = new HTableDescriptor(TableName.valueOf("students"));
            students.addFamily(new HColumnDescriptor("info"));
            admin.createTable(students);
        }

        BufferedReader bufferedReader = new BufferedReader(new FileReader("data/students.txt"));
        String line = bufferedReader.readLine();

        // 获取table
        Table students = conn.getTable(TableName.valueOf("students"));

        while (line != null) {
            // 写HBase
            String[] splits = line.split(",");
            String id = splits[0];
            String name = splits[1];
            int age = Integer.valueOf(splits[2]);
            String gender = splits[3];
            String clazz = splits[4];

            String rowkey = gender + "_" + id;
            // 以性别+id作为rowkey
            Put put = new Put(rowkey.getBytes());
            put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
//            // int 类型的数据无法使用getBytes方法
            put.addColumn("info".getBytes(), "age".getBytes(), Bytes.toBytes(age));
            put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
            put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.getBytes());

            students.put(put);
            // 读取下一条数据
            line = bufferedReader.readLine();
        }


    }


    @Test
    // 通过二级索引快速筛选出students表中gender为男的学生信息
    // 二级索引：建立行键与rowkey的映射关系

    public void getMaleInfo() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("男".getBytes());
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);

        Scan scan = new Scan();
        scan.setFilter(rowFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

    @After
    public void close() {
        try {
            if (conn != null) {
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
